package com.gitee.jastee.kafka.example;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 拦截器使用介绍
 * 主要功能：实现统计从数据发送的kafka时间与消费时的时间差，即 消费的时间-发送到的时间
 * 使用拦截器：
 * wiki.hadoop.kafka.Interceptor.JastProducerInterceptor
 * wiki.hadoop.kafka.Interceptor.JastConsumerInterceptor
 * @Author jast
 * @Date 2020/4/19 下午2:28
 * @Version 1.0
 */
public class InterceptorDemo {

    private static final String topic = "test";
    private static final String group = "test-consumer";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            consumerData();
        });
        //等待10秒，消费者连接成功
        TimeUnit.SECONDS.sleep(10);
        //发送数据
        CompletableFuture.runAsync(()->{
            try {
                sendData();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        future.get();
        TimeUnit.SECONDS.sleep(10000);
    }

    /**
     * 消费数据
     */
    private  static void consumerData(){

        InterceptorKafkaConsumerClient consumerClient = new InterceptorKafkaConsumerClient();
        KafkaConsumer<String, String> consumer = consumerClient.createConsumer(topic, group, 100, false);
        for(int i = 0 ; i < 1000000 ; i ++){
            long startTime = System.currentTimeMillis();
            ConsumerRecords<String, String> records = consumer.poll(10000);
            for(ConsumerRecord<String,String> record:records){
                String value = record.value();
                if(JSONUtil.isJson(value)) {
                    JSONObject jsonObject = JSONUtil.parseObj(value);
                    Long producer_time = jsonObject.getLong("producer_time");
                    String content = "从发送到消费消耗了: " + (System.currentTimeMillis() - producer_time) + " ms" + "  " + (++i);
                    System.out.println(content);
                }
//                System.out.println("消费数据->"+record.value());
            }
//            System.out.println("消费需要时间："+(System.currentTimeMillis()-startTime));
        }
    }
    /**
     * 发送测试数据
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private static void sendData() throws ExecutionException, InterruptedException {
        long startTime = System.currentTimeMillis();
        InterceptorKafkaProducerClient producerClient = new InterceptorKafkaProducerClient();
        Producer<String, String> producer = producerClient.getProducer();
        JSONObject jo = new JSONObject();
        jo.set("key","lalala");
        jo.set("timestamp",System.currentTimeMillis());
        for(int i = 0;i<100000;i++) {
            String data = jo.toJSONString(0);
//            System.out.println(data);
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, data);
            Future<RecordMetadata> future = producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("这里异常了" + metadata.toString() + "," + exception.getMessage());
                    exception.printStackTrace();
                }
            });
            //这里测试使用，线上环境
//            future.get();
        }
        System.out.println("发送数据时间："+(System.currentTimeMillis()-startTime));
    }




}
class InterceptorKafkaProducerClient {


    private  static  Producer<String, String> producer ;
    public InterceptorKafkaProducerClient() {
        Properties props = new Properties();
        //设置自定义拦截器
        List<String> list =new ArrayList<String>();
        //list拦截器而已添加多个
        list.add("wiki.hadoop.kafka.Interceptor.JastProducerInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,list);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //props.put("connections.max.idle.ms", "5000");
        //props.put("group.max.session.timeout.ms", "5000");
        producer = new KafkaProducer<String, String>(props);

    }

    public Producer<String,String> getProducer(){
        return producer;
    }


}

class InterceptorKafkaConsumerClient   {


    private final String KAFKA_BROKERS = "10.16.0.2:9092";

    /**
     * 消费数据 使用
     * @return KafkaConsumer<String,String>
     * @param topic
     * @param max_poll_records
     * @param group
     * @return
     */
    public KafkaConsumer<String, String> createConsumer(String topic, String group , int max_poll_records ,boolean isLatest) {
        Properties props = new Properties();
        //设置消费者拦截器
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"wiki.hadoop.kafka.Interceptor.JastConsumerInterceptor");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
        //        props.put("zookeeper.session.timeout.ms", "4000");
        //        props.put("zookeeper.sync.time.ms", "200");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量
//		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);//连接超时
//		props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);//请求超时
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 false
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topic));
        return consumer;
    }


}
